Skip to content

Persist task tracker#1949

Open
zhoujh01 wants to merge 2 commits intomainfrom
persist_task_tracker
Open

Persist task tracker#1949
zhoujh01 wants to merge 2 commits intomainfrom
persist_task_tracker

Conversation

@zhoujh01
Copy link
Copy Markdown
Collaborator

@zhoujh01 zhoujh01 commented May 9, 2026

OpenViking Task Tracker 持久化设计与当前实现说明

1. 背景

OpenViking 的异步任务跟踪由 TaskTracker 提供,典型任务包括:

  • session_commit
  • add_resource
  • add_skill
  • admin_reindex

这些接口会返回 task_id,调用方再通过下面两个接口查询状态:

  • GET /api/v1/tasks/{task_id}
  • GET /api/v1/tasks

原始实现只有进程内内存映射:

  • task_id -> TaskRecord

这会带来两个问题:

  1. 多实例部署时,一个实例返回的 task_id,请求打到另一个实例就查不到。
  2. 服务重启后,历史任务状态随内存一起丢失。

当前实现已经把 TaskTracker 改造成“接口基本不变、存储后端可插拔”的结构,同时支持:

  • memory
  • persistent

其中 persistent 方案可以满足:

  1. 集群内跨实例查询同一个 task_id
  2. 服务重启后继续查询已落盘任务
  3. 后续异步阶段继续更新同一个 task_id

2. 设计目标

当前实现的目标如下:

  1. 不破坏 TaskTracker 的核心对外能力。
  2. 允许通过配置在 memorypersistent 后端之间切换。
  3. persistent 模式下,任务状态对多实例可见。
  4. 后续异步阶段可以基于 task_id + account_id + user_id 继续更新同一任务。
  5. 保持实现简单,不引入额外的分布式协调复杂度。

3. 非目标

当前实现明确不处理下面这些复杂问题:

  1. 不做持久化任务自动清理。
  2. 不做实例宕机后自动把任务改成 failed
  3. 不做 lease / heartbeat / stale detection。
  4. 不做任务接管或故障转移。
  5. 不做 CAS 或分布式锁,允许简单的 last-write-wins。

4. 当前总体架构

4.1 组件

当前实现涉及三个核心部分:

  1. TaskTracker

    • 文件:openviking/service/task_tracker.py
    • 负责任务生命周期、内存缓存、权限过滤、TTL 清理
  2. TaskStore

    • 文件:openviking/service/task_store.py
    • 负责底层任务存储抽象
  3. StorageConfig.build_task_tracker()

    • 文件:openviking_cli/utils/config/storage_config.py
    • 负责根据配置构建 TaskTracker

4.2 存储后端

TaskStore 当前有两种实现:

  1. InMemoryTaskStore
  2. PersistentTaskStore

TaskTracker 内部既保留:

  • 当前进程内 _tasks: Dict[str, TaskRecord]

也持有:

  • self._store

其中:

  • memory 模式下,self._storeInMemoryTaskStore
  • persistent 模式下,self._storePersistentTaskStore

5. 配置方式

当前配置项如下:

{
  "storage": {
    "task_tracker": {
      "backend": "memory"
    }
  }
}

支持值:

  • memory
  • persistent

5.1 默认值

当前默认值是:

{
  "storage": {
    "task_tracker": {
      "backend": "memory"
    }
  }
}

也就是说:

  • 默认行为仍然是单进程内存任务跟踪
  • 如果需要跨实例 / 重启后可查,必须显式改成 persistent

5.2 构建逻辑

当前构建逻辑是:

  • backend == "memory" -> TaskTracker()
  • backend == "persistent" -> TaskTracker(store=PersistentTaskStore(agfs))

6. 数据模型

6.1 TaskRecord

当前 TaskRecord 字段如下:

task_id: str
task_type: str
status: TaskStatus
created_at: float
updated_at: float
resource_id: Optional[str]
account_id: Optional[str]
user_id: Optional[str]
result: Optional[Dict[str, Any]]
error: Optional[str]

说明:

  1. account_id / user_id 是任务归属信息。
  2. TaskRecord.to_dict() 对外返回时会去掉 account_id / user_id
  3. 当前没有 schema_version 字段。

6.2 持久化 JSON 结构

持久化文件内部直接存 TaskRecord 的 JSON 表达,示例如下:

{
  "task_id": "5f0ec6f0-9e69-4d32-8dc7-2e5f4c2efabc",
  "task_type": "add_resource",
  "status": "completed",
  "created_at": 1778300000.123,
  "updated_at": 1778300002.456,
  "resource_id": "viking://resources/demo",
  "account_id": "acme",
  "user_id": "alice",
  "result": {
    "root_uri": "viking://resources/demo"
  },
  "error": null
}

7. 持久化路径设计

7.1 当前物理路径

当前 PersistentTaskStore 的落盘路径是:

/local/{account_id}/tasks/{user_id}/{task_id}.json

例如:

/local/acme/tasks/alice/a59daadb-c164-446a-86a2-361ac65b2614.json

7.2 这样设计的原因

当前接口查询语义是按 account_id + user_id 过滤,因此路径上也带上 user_id,这样更一致:

  1. account_id 负责租户级隔离
  2. user_id 负责租户内用户级分桶
  3. task_id 是该用户任务目录下的唯一键

相较于旧版的:

/local/{account_id}/tasks/{task_id}.json

当前路径更符合现有接口行为。

7.3 目录层次

当前目录结构含义如下:

/local/{account_id}/tasks/                  # 某个 account 的 task 根目录
/local/{account_id}/tasks/{user_id}/       # 某个 user 的 task 子目录
/local/{account_id}/tasks/{user_id}/{task_id}.json

7.4 保留目录约束

tasks 被视为内部保留目录,不应被当作普通业务资源使用。

当前实现里:

  • openviking/storage/viking_fs.py

已经将 tasks 视为内部保留名称的一部分,用于避免被普通资源视图误用。

8. TaskStore 抽象

8.1 当前接口

当前 TaskStore 接口如下:

  1. create(task)
  2. update(task)
  3. get(task_id, account_id=None, user_id=None)
  4. list(account_id, user_id=None)
  5. delete(task_id, account_id, user_id=None)

8.2 InMemoryTaskStore

InMemoryTaskStore 的特点:

  1. 数据仅保存在当前进程内存
  2. 支持按 account_id / user_id 过滤
  3. 重启即丢失
  4. 会配合 TaskTracker 的 TTL 清理删除内存记录

8.3 PersistentTaskStore

PersistentTaskStore 的特点:

  1. 通过 AGFS 持久化到 /local/...
  2. create / update 都是覆盖写目标 JSON 文件
  3. get 必须提供:
    • account_id
    • user_id
  4. list 当前只支持按单个 user 目录扫描
  5. delete 当前也要求提供 account_id + user_id

8.4 目录创建行为

PersistentTaskStore 在写入前会确保目录存在:

  1. /local/{account_id}
  2. /local/{account_id}/tasks
  3. /local/{account_id}/tasks/{user_id}

当前实现对 mkdir already exists 做了幂等处理,避免重复创建时报错。

9. TaskTracker 当前行为

9.1 创建任务

创建任务时:

  1. 必须提供非空的 account_id
  2. 必须提供非空的 user_id
  3. 先写入内存 _tasks
  4. 再调用 self._store.create(task)

适用接口:

  • create()
  • create_if_no_running()

9.2 更新任务

当前以下接口都支持显式传入:

  • start(task_id, account_id=None, user_id=None)
  • complete(task_id, result=None, account_id=None, user_id=None)
  • fail(task_id, error, account_id=None, user_id=None)

行为分两种:

  1. 如果当前实例内存里已经有这个 task_id

    • 直接命中内存对象
    • 更新内存
    • 同步写回 self._store
  2. 如果当前实例内存里没有这个 task_id

    • 只有在同时提供 account_id + user_id
    • 才会去持久化层回捞
    • 回捞成功后更新并再次写回

这就是“跨实例 / 重启后继续更新同一个 task”的基础。

9.3 查询任务

get(task_id, account_id=None, user_id=None) 行为:

  1. 先看本地 _tasks
  2. 如果本地没有且传了 account_id
    • 尝试从 store 回捞
  3. 最后通过 _matches_owner()account_id + user_id 过滤

9.4 列表与查重

list_tasks()has_running() 在带 account_id 的情况下,会先尝试从 store 加载该范围内任务,再做过滤。

当前在 persistent 模式下,真正的加载粒度已经是:

  • account_id + user_id

因为 PersistentTaskStore.list() 当前只按单 user 目录列举。

10. HTTP 查询语义

10.1 当前 /api/v1/tasks 语义

/api/v1/tasks 当前不是只按 account 过滤,而是同时按:

  • account_id
  • user_id

过滤返回。

路由位于:

  • openviking/server/routers/tasks.py

10.2 当前 /api/v1/tasks/{task_id} 语义

单任务查询同样按:

  • account_id
  • user_id

限制可见性。

也就是说:

  1. 同一 account 下不同 user 默认互相不可见
  2. task 的持久化路径分桶和查询权限语义保持一致

11. 后续异步阶段如何更新同一个 task_id

11.1 当前要求

如果任务后续更新有可能发生在:

  • 另一个实例
  • 当前实例重启之后
  • 当前实例内存缓存丢失之后

那么后续更新代码必须显式携带:

  • task_id
  • account_id
  • user_id

11.2 原因

因为当前持久化路径是:

/local/{account_id}/tasks/{user_id}/{task_id}.json

如果没有 user_id,就无法从持久化层准确定位到文件。

11.3 当前已经补齐的调用链

当前实现里,下面这些异步更新链路都已经显式传递 account_id + user_id

  1. session.commit_async() 后续阶段
  2. ResourceService.add_resource() 队列监控阶段
  3. ResourceService.add_skill() 队列监控阶段
  4. ReindexExecutor._run_tracked()

12. 清理语义

12.1 memory 模式

memory 模式保留原始的 TTL 清理行为:

  1. COMPLETED 保留 24 小时
  2. FAILED 保留 7 天
  3. 每 5 分钟清理一次
  4. 超过 MAX_TASKS=10000 时做 FIFO 淘汰

12.2 persistent 模式

当前一期实现里:

  1. TaskTracker 仍然会清理当前进程内 _tasks 缓存
  2. 但不会删除持久化 JSON 文件

也就是说:

  • 内存缓存会过期
  • 持久化记录不会自动清理

这是当前实现有意保留的简化策略。

13. 当前实现的优点

  1. 对上层调用方改动小,TaskTracker 主接口仍然可用。
  2. memorypersistent 可切换,便于渐进上线。
  3. persistent 模式能满足最基本的跨实例查询诉求。
  4. account_id + user_id 分桶与当前 HTTP 查询语义一致。
  5. 后续异步阶段只要带齐三元组:
    • task_id
    • account_id
    • user_id
      就能继续更新同一任务。

14. 当前实现的限制

  1. 持久化任务不会自动清理,长期运行会累积 JSON 文件。
  2. 如果执行任务的实例中途挂掉,任务不会自动变成 failed
  3. PersistentTaskStore.list() 当前只支持 user 级列举,不支持 account 级聚合枚举。
  4. start/complete/fail 在跨实例场景下必须显式传 user_id,否则无法命中持久化路径。
  5. 当前没有分布式并发保护,多个实例同时更新同一任务时采用简单覆盖写语义。

15. 当前实现总结

当前代码已经落地了一个“一期可用”的持久化 task tracker:

  1. 默认仍是 memory
  2. 打开 persistent 后,可通过:
    • /local/{account_id}/tasks/{user_id}/{task_id}.json
      提供跨实例查询基础
  3. TaskTracker 对外能力保留
  4. TaskStore 负责存储抽象
  5. 后续异步更新链路显式携带 account_id + user_id

这套实现已经满足最核心的集群诉求:

  • 一个实例返回的 task_id
  • 另一个实例可以查到
  • 重启后仍可查
  • 后续阶段还能继续更新到同一个 task_id

但它仍然是一个偏简单的一期实现,未来如果需要更完善的运维能力,还可以继续补:

  • 持久化清理
  • 宕机超时失败标记
  • account 级任务枚举
  • 更严格的并发更新保护

@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 9, 2026

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪
🏅 Score: 85
🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Missing argument in async task creation

The call to _monitor_queue_processing in add_resource is missing the user_id argument, which will cause a TypeError when the coroutine is created.

asyncio.create_task(
    self._monitor_queue_processing(
        task.task_id,
        telemetry_id,
        ctx.account_id,
    )
)

@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 9, 2026

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Add missing user_id argument to _monitor_queue_processing call in add_resource

The call to _monitor_queue_processing is missing the user_id argument, which will
cause a TypeError at runtime. Add ctx.user.user_id as the fourth positional argument
to match the method's updated signature.

openviking/service/resource_service.py [304-310]

 asyncio.create_task(
     self._monitor_queue_processing(
         task.task_id,
         telemetry_id,
         ctx.account_id,
+        ctx.user.user_id,
     )
 )
Suggestion importance[1-10]: 8

__

Why: The call to _monitor_queue_processing is missing the user_id argument, which would cause a TypeError at runtime since the method signature was updated to require account_id and user_id. This is a critical bug fix.

Medium

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

Status: Backlog

Development

Successfully merging this pull request may close these issues.

2 participants